Vertex AI PipelinesからBigQueryMLで機械学習モデルの訓練を行い予測を実行する
データアナリティクス事業本部 機械学習チームの鈴木です。
この記事は、ブログリレー『Google CloudのAI/MLとかなんとか』の2本目の記事になります。
Google Cloud Pipeline Components SDKのうち、BigQuery MLリソースに関連するオペレータを使用して、Vertex AI Pipelinesで実行する用の機械学習パイプラインの定義を作成しましたので、サンプルをご紹介します。
この記事について
『Vertex AI Pipelines の BigQuery および BigQuery ML 演算子に関するお知らせ』では、Vertex AI Pipelines向けのBigQueryおよびBQMLコンポーネントが紹介されていました。
このコンポーネントを使用して、Vertex AI PipelinesでBQMLのモデル作成用のデータ前処理・モデル訓練・評価・推論までを行う機械学習パイプラインの例をご紹介します。
BigQueryで一連の処理を行うために、Google Cloudではオーケストレーターとしていくつかの選択肢があると思います。
例えば以下のようなものが考えられます。
- Cloud Composer
- Vertex AI Pipelines
- Workflows
- BigQueryのスケジュールクエリ
- Compute Engineに独自のワークフローエンジンを構築
このうち、Vertex AI Pipelinesはポータビリティに優位性があり、Vertex AIのメタデータ管理の恩恵を受けることもできます。
Vertex AI PipelinesとBigQuery MLコンポーネントについて
Vertex AI Pipelinesについて
Vertex AI Pipelinesは、Kubeflow Pipelines SDKまたはTensorFlow Extendedを使用して構築されたパイプラインを実行できる、サーバーレスなMLワークフローのオーケストレーションサービスです。
ブログリレー1本目の記事である、以下の記事でご紹介しました。
BigQuery MLコンポーネントについて
BigQuery MLコンポーネントは、Google Cloud Pipeline Components(GCPC)のコンポーネントの一つです。GCPCは、Google Cloud Vertex AI PipelinesなどKFP準拠のパイプライン実行バックエンドで実行できる、事前定義されたKFPコンポーネントを提供します。GCPCのコンポーネントはKubeflow Pipelines SDKを使用して、パイプラインに組み込むことができます。
BigQuery MLコンポーネントの一覧は以下のガイドにあります。各コンポーネントの詳細が確認できるので、記載された仕様を調べつつ機械学習パイプラインを実装することになります。
パイプラインの実装
1. 前提
今回は、『Vertex AI Pipelines の BigQuery および BigQuery ML 演算子に関するお知らせ』で紹介されていた『google_cloud_pipeline_components_bqml_text.ipynb』を参考に、既にBigQueryに格納されているテーブルデータに対して、BQMLのモデル作成用のデータ前処理・モデル訓練・評価・推論までを行う機械学習パイプラインを作成しました。
コードはこのノートブックの内容を引用・改変したものです。このノートブックのライセンスはApache-2.0 license
であり、このブログに記載のコードもそれにしたがうことにご留意ください。
2. Python環境の準備
作業用ディレクトリを作成し、pyenvで以下のバージョンを指定して仮想環境を作成しました。
mkdir vertexai_pipeline_bigquery cd vertexai_pipeline_bigquery pyenv local 3.10.13 python -m venv vertexai_pipeline_bigquery
仮想環境を有効化し、Kubeflow Pipelines SDKとGCPCをインストールしました。
source vertexai_pipeline_bigquery/bin/activate pip install kfp==2.4.0 pip install google-cloud-pipeline-components==2.6.0
3. BigQueryの準備
asia-northeast1
にkubeflow_dataset
という名前の空のデータセットを作成しました。
このデータセットに、bigquery-public-data.ml_datasets.penguins
のデータを東京リージョンに移してきたpenguins
テーブルを作成しました。
4. 作成したコード
以下のコードを作成しました。
from kfp import compiler from kfp import dsl from bqml_utils import * def main(): @dsl.pipeline def bq_hello_pipeline(): from google_cloud_pipeline_components.v1.bigquery import ( BigqueryCreateModelJobOp, BigqueryEvaluateModelJobOp, BigqueryPredictModelJobOp, BigqueryQueryJobOp) # run preprocessing job bq_preprocess_op = BigqueryQueryJobOp( query=generate_create_bq_preprocess_query(), project=get_project(), location="asia-northeast1", ) # create the linear regression bq_model_op = BigqueryCreateModelJobOp( query=generate_create_bq_model_query(), project=get_project(), location="asia-northeast1", ).after(bq_preprocess_op) # evaluate the linear regression bq_evaluate_op = BigqueryEvaluateModelJobOp( project=get_project(), location="asia-northeast1", model=bq_model_op.outputs["model"] ).after(bq_model_op) # similuate prediction BigqueryPredictModelJobOp( model=bq_model_op.outputs["model"], query_statement=generate_create_bq_prediction_query(), job_configuration_query=get_predict_job_config(), project=get_project(), location="asia-northeast1", ).after(bq_evaluate_op) compiler.Compiler().compile(bq_hello_pipeline, 'pipeline.yaml') if __name__ == "__main__": main()
google_cloud_pipeline_components.v1.bigquery
より、使用するコンポーネントをインポートしました。
BigqueryPredictModelJobOp
は作成した推論結果を保存するテーブルを、job_configuration_query
で指定しました。
また、実行するSQL文などを扱いやすくするため、以下のスクリプトに必要な関数を分けました。長めのスクリプトを2つ並べると説明が見にくくなるのでトグルメニューに隠しておきます。
bqml_utils.py
PROJECT_ID = "自分のGoogleCloudプロジェクトID" BQ_DATASET = "kubeflow_dataset" RAW_DATA_TABLE = "penguins" PREPROCESSED_TABLE = "penguins_processed" CLASSIFICATION_MODEL_NAME = "penguins_model" PREDICT_RESULT_TABLE = "penguins_predict_result" def get_project(): """ プロジェクトIDを返す """ return PROJECT_ID def get_predict_job_config(): """ 推定結果を格納するテーブルの情報をジョブ設定として返す """ job_config = { "destinationTable": { "projectId": PROJECT_ID, "datasetId": BQ_DATASET, "tableId": PREDICT_RESULT_TABLE, } } return job_config def generate_create_bq_preprocess_query(): """ 前処理のためのSQLを返す """ create_bq_preprocess_query = f""" -- create the splitted table CREATE OR REPLACE TABLE `{PROJECT_ID}.{BQ_DATASET}.{PREPROCESSED_TABLE}` AS ( WITH data_with_split_flag AS ( SELECT species, island, culmen_length_mm, culmen_depth_mm, flipper_length_mm, body_mass_g, sex, RAND() AS split_flag FROM `{PROJECT_ID}.{BQ_DATASET}.{RAW_DATA_TABLE}` WHERE body_mass_g IS NOT NULL ) SELECT species, island, culmen_length_mm, culmen_depth_mm, flipper_length_mm, body_mass_g, sex, CASE WHEN split_flag < 0.8 THEN 'TRAIN' ELSE 'PREDICT' END AS split FROM data_with_split_flag ) """ return create_bq_preprocess_query def generate_create_bq_model_query(): """ モデル作成を実行するSQLを返す """ create_bq_model_query = f""" CREATE OR REPLACE MODEL `{PROJECT_ID}.{BQ_DATASET}.{CLASSIFICATION_MODEL_NAME}` OPTIONS ( model_type='linear_reg', input_label_cols=['body_mass_g']) AS SELECT species, island, culmen_length_mm, culmen_depth_mm, flipper_length_mm, body_mass_g, sex FROM `{PROJECT_ID}.{BQ_DATASET}.{PREPROCESSED_TABLE}` WHERE split = 'TRAIN'; """ return create_bq_model_query def generate_create_bq_prediction_query(): """ モデルで推定する対象のデータを表すSQLを返す """ create_bq_prediction_query = f""" SELECT species, island, culmen_length_mm, culmen_depth_mm, flipper_length_mm, sex FROM `{PROJECT_ID}.{BQ_DATASET}.{PREPROCESSED_TABLE}` WHERE split = 'PREDICT' """ return create_bq_prediction_query
5. IR YAMLの生成
以下のように、create_pipeline_yaml.py
を実行してIR YAMLを生成しました。
python create_pipeline_yaml.py
意図通りに実行できれば、スクリプトと同じディレクトリにpipeline.yaml
ができます。
tree -L 1 # . # ├── bqml_utils.py # ├── create_pipeline_yaml.py # ├── pipeline.yaml # └── vertexai_pipeline_bigquery
パイプラインの実行
1. Vertex AI Pipelinesで実行
pipeline.yaml
をVertex AI Pipelinesにアップロードして実行を作成しました。
まず、実行を作成
を押して、実行のソース
をファイルをアップロード
とし、pipeline.yaml
をアップロードしました。
Cloud Storage のロケーション
として適当な場所を指定して、送信
しました。
以下のようにパイプラインの実行が始まりました。
前処理のステップのパイプライン実行分析を見ると、以下のように入力パラメータとしてどんなクエリを実行したのかが分かるようになっていました。
このように、全てのステップが成功しました。
2. BigQueryでの結果確認
データセットを確認すると、以下のようにリソースができていることを確認できました。
推論結果も出力されていました。この出力を使って、機械学習機能以降の処理を行うわけですね。
最後に
Google Cloud Pipeline Components SDKのうち、BigQuery MLリソースに関連するオペレータを使用して、Vertex AI Pipelinesで実行する用の機械学習パイプラインの定義を作成する例のご紹介でした。
Vertex AI Pipelinesは定期実行もサポートしているので、BQMLの実行はこの機能からするのがよさそうだなと思いました。
『コンポーネントでアーティファクトを使用または生成する』に記載のように、Google Cloud PipelineコンポーネントSDKは、コンポーネントの入力および出力として機能する一連のMLメタデータアーティファクトタイプを定義でき、Vertex AIメタデータでリソースの追跡をできる点も大きなポイントですね。